Skip to content

Multiplex filtering#362

Open
amyjaynethompson wants to merge 9 commits into
mainfrom
multiplex_filtering
Open

Multiplex filtering#362
amyjaynethompson wants to merge 9 commits into
mainfrom
multiplex_filtering

Conversation

@amyjaynethompson
Copy link
Copy Markdown
Contributor

@amyjaynethompson amyjaynethompson commented Apr 28, 2026

xia2.multiplex has a filtering option built in which can greatly improve data reduction quality. VMXm, in particular, always manually reprocess datasets with xia2.multiplex to turn on these filtering parameters. Therefore, it would be nice to include this as a part of the auto processing infrastructure.

The issue has always been that the filtering can be slow, and this can impede rapid feedback. In xia2, we recently made a new command line program, xia2.multiplex_filtering. This performs the same filtering algorithms on a completed multiplex job. By breaking the algorithm into two separate programs, this would allow for rapid feedback as well as providing a filtered mtz later. This PR attempts to provide trigger/wrappers for such a filtering pipeline.

The cluster number is passed through from multiplex to multiplex_filtering to ensure that it is not triggered on clusters (possible implementation for clusters in the future, but would need slightly different triggering requirements).

As this pipeline relies on a finished multiplex directory (specific files needed that are not user-interesting), checks are done to make sure data is available where expected. This is done using the same delay multipliers as multiplex.

The sample group information is also passed through from multiplex. This is important, as there can be multiple sample groups related to a single DCID. Multiplex also passes through the actual DCID's it used in processing. This is also important, as the stored list of related DCID's can include both rotation/grid scans or other datasets that should not be used. Given all the relevant queries are already done in the multiplex trigger, it seemed easiest to pass these through rather than repeating all these queries.

The filtering itself is set to image_group mode, which means all the images are grouped into batches and a deltacchalf algorithm is used to see if any of these batches do not correlate well with the rest of the data. A group size of 50 is set as default, as this corresponds to 5deg rotation (following standard 0.1 deg fine slicing). However, VMXm have had success using a group size of 10, so they have this specified for their beamline.

General intent here is to test on VMXm first via staging, then roll it out live just for VMXm initially. This will be useful stress testing prior to deployment on other beam lines. Eventually, it is expected that this is triggered on all beam lines after multiplex.

NOTE: will need dials/latest to run -> this includes xia2.multiplex_filtering bug fixes which are not in the latest release.

@amyjaynethompson
Copy link
Copy Markdown
Contributor Author

Refactored the code so that a separate trigger function was no longer needed. The multiplex recipe has been updated so that a new output channel "filtering" is able to trigger xia2.multiplex_filtering. This saves moving parameters between multiplex and multiplex_filtering.

Comment thread src/dlstbx/services/trigger.py Outdated
Comment thread src/dlstbx/services/trigger.py Outdated
Comment thread src/dlstbx/services/trigger.py Outdated
Comment thread src/dlstbx/services/trigger.py Outdated
Comment thread src/dlstbx/services/trigger.py Outdated
Comment thread src/dlstbx/services/trigger.py Outdated
Comment thread src/dlstbx/services/trigger.py Outdated
Comment thread src/dlstbx/services/trigger.py Outdated
Comment thread src/dlstbx/services/trigger.py Outdated

# Place holder code for future iterations where may run filtering jobs on clusters

if cluster_num is not None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it makes sense to retain cluster logic here (and possibly elsewhere in this wrapper). The flitering job is triggered as a separate job by the multiplex wrapper. With the way the recipe is structured, if you were running filtering on clusters, a separate call of the filtering wrapper would get made for each cluster so you wouldn't need the same logic that loops over and distinguishes between clusters and non-clusters.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense - I have removed the cluster logic from ispyb submission and when searching for the output file attachments.

Comment on lines +166 to +174
# ignore filtering parameters for xia2.multiplex_filtering
ignore = {
"sample_id",
"sample_group_id",
"filtering.method",
"deltacchalf.stdcutoff",
"deltacchalf.mode",
"deltacchalf.group_size",
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment here is misleadingly placed as it implies that sample_id and sample_group_id are flitering parameters.

It is also a bit clunky having to explicitly list parameters to ignore, though I appreciate this is due to how the multiplex job parameters are passed via the trigger service and might be hard to avoid.

Comment on lines +133 to +139
ignore = {
"sample_id",
"sample_group_id",
"data",
"clustering.method",
"clustering.output_clusters",
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, it will be a pain in the future if you have to add a parameter here every time you want to pass a new parameter to multiplex and not filtering via the trigger service.

Comment on lines +199 to +202
elif len(waiting_processing_files) > 0 and ntry >= backoff_max_try:
self.log.warning("Timed out waiting for xia2.multiplex files to copy.")
timedout = True
waiting = False
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
elif len(waiting_processing_files) > 0 and ntry >= backoff_max_try:
self.log.warning("Timed out waiting for xia2.multiplex files to copy.")
timedout = True
waiting = False
elif len(waiting_processing_files) > 0 and ntry >= backoff_max_try:
self.log.error("Timed out waiting for xia2.multiplex files to copy.")
return False

You can just return False here and remove the need for the timedout variable. Also this should log as an error instead of a warning.

if not mplx_file.is_file():
waiting_processing_files.append(mplx_file)
self.log.info(
f"Files still copying - {mplx_file} not yet present in {multiplex_dir}."
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mplx_file is the complete file path, including multiplex_dir so having both together in this log message is unnecessary. Either just include the file name (i.e. mplx_file.name) along with the directory or give the full file path but not the directory as well.

Logging individually for each file missing is also overkill. You could wait for the loop to finish and log the list of waiting_processing_files instead.

Comment on lines +181 to +206
ntry = 0
waiting = True
timedout = False
backoff_max_try = 10
backoff_multiplier = 2
backoff_delay = 8
while waiting:
waiting_processing_files = []
for mplx_file in needed_files:
if not mplx_file.is_file():
waiting_processing_files.append(mplx_file)
self.log.info(
f"Files still copying - {mplx_file} not yet present in {multiplex_dir}."
)
if len(waiting_processing_files) > 0 and ntry < backoff_max_try:
delay = int(backoff_delay * backoff_multiplier**ntry)
time.sleep(delay)
ntry += 1
elif len(waiting_processing_files) > 0 and ntry >= backoff_max_try:
self.log.warning("Timed out waiting for xia2.multiplex files to copy.")
timedout = True
waiting = False
else:
self.log.info("All files present for xia2.multiplex_filtering")
waiting = False

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to delegate all of this logic to the FileWatcher service. This way, you wouldn't be tying up resources on the cluster waiting for files to be copied across.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants